/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.txn.TxnHeader;

/**
 * This class is the superclass of two of the three main actors in a ZK
 * ensemble: Followers and Observers. Both Followers and Observers share a good
 * deal of code which is moved into Peer to avoid duplication.
 */
public class Learner {
	static class PacketInFlight {
		TxnHeader hdr;
		Record rec;
	}

	protected static final Logger LOG = LoggerFactory.getLogger(Learner.class);
	static final private boolean nodelay = System.getProperty("follower.nodelay", "true").equals("true");

	static {
		LOG.info("TCP NoDelay set to: " + nodelay);
	}

	protected BufferedOutputStream bufferedOutput;

	protected InputArchive leaderIs;

	protected OutputArchive leaderOs;
	/** the protocol version of the leader */
	protected int leaderProtocolVersion = 0x01;
	final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();

	QuorumPeer self;

	protected Socket sock;
	LearnerZooKeeperServer zk;

	/**
	 * Establish a connection with the Leader found by findLeader. Retries 5 times
	 * before giving up.
	 * 
	 * @param addr - the address of the Leader to connect to.
	 * @throws IOException          - if the socket connection fails on the 5th
	 *                              attempt
	 * @throws ConnectException
	 * @throws InterruptedException
	 */
	protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException {
		sock = new Socket();
		sock.setSoTimeout(self.tickTime * self.initLimit);
		for (int tries = 0; tries < 5; tries++) {
			try {
				sock.connect(addr, self.tickTime * self.syncLimit);
				sock.setTcpNoDelay(nodelay);
				break;
			} catch (IOException e) {
				if (tries == 4) {
					LOG.error("Unexpected exception", e);
					throw e;
				} else {
					LOG.warn("Unexpected exception, tries=" + tries + ", connecting to " + addr, e);
					sock = new Socket();
					sock.setSoTimeout(self.tickTime * self.initLimit);
				}
			}
			Thread.sleep(1000);
		}
		leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
		bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
		leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
	}

	/**
	 * Returns the address of the node we think is the leader.
	 */
	protected InetSocketAddress findLeader() {
		InetSocketAddress addr = null;
		// Find the leader by id
		Vote current = self.getCurrentVote();
		for (QuorumServer s : self.getView().values()) {
			if (s.id == current.getId()) {
				addr = s.addr;
				break;
			}
		}
		if (addr == null) {
			LOG.warn("Couldn't find the leader with id = " + current.getId());
		}
		return addr;
	}

	public int getPendingRevalidationsCount() {
		return pendingRevalidations.size();
	}

	/**
	 * Socket getter
	 * 
	 * @return
	 */
	public Socket getSocket() {
		return sock;
	}

	protected void ping(QuorumPacket qp) throws IOException {
		// Send back the ping with our session data
		ByteArrayOutputStream bos = new ByteArrayOutputStream();
		DataOutputStream dos = new DataOutputStream(bos);
		HashMap<Long, Integer> touchTable = zk.getTouchSnapshot();
		for (Entry<Long, Integer> entry : touchTable.entrySet()) {
			dos.writeLong(entry.getKey());
			dos.writeInt(entry.getValue());
		}
		qp.setData(bos.toByteArray());
		writePacket(qp, true);
	}

	/**
	 * read a packet from the leader
	 *
	 * @param pp the packet to be instantiated
	 * @throws IOException
	 */
	void readPacket(QuorumPacket pp) throws IOException {
		synchronized (leaderIs) {
			leaderIs.readRecord(pp, "packet");
		}
		long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
		if (pp.getType() == Leader.PING) {
			traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
		}
		if (LOG.isTraceEnabled()) {
			ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
		}
	}

	/**
	 * Once connected to the leader, perform the handshake protocol to establish a
	 * following / observing connection.
	 * 
	 * @param pktType
	 * @return the zxid the Leader sends for synchronization purposes.
	 * @throws IOException
	 */
	protected long registerWithLeader(int pktType) throws IOException {
		/*
		 * Send follower info, including last zxid and sid
		 */
		long lastLoggedZxid = self.getLastLoggedZxid();
		QuorumPacket qp = new QuorumPacket();
		qp.setType(pktType);
		qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));

		/*
		 * Add sid to payload
		 */
		LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
		ByteArrayOutputStream bsid = new ByteArrayOutputStream();
		BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
		boa.writeRecord(li, "LearnerInfo");
		qp.setData(bsid.toByteArray());

		writePacket(qp, true);
		readPacket(qp);
		final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
		if (qp.getType() == Leader.LEADERINFO) {
			// we are connected to a 1.0 server so accept the new epoch and read the next
			// packet
			leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
			byte epochBytes[] = new byte[4];
			final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
			if (newEpoch > self.getAcceptedEpoch()) {
				wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
				self.setAcceptedEpoch(newEpoch);
			} else if (newEpoch == self.getAcceptedEpoch()) {
				// since we have already acked an epoch equal to the leaders, we cannot ack
				// again, but we still need to send our lastZxid to the leader so that we can
				// sync with it if it does assume leadership of the epoch.
				// the -1 indicates that this reply should not count as an ack for the new epoch
				wrappedEpochBytes.putInt(-1);
			} else {
				throw new IOException(
						"Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
			}
			QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
			writePacket(ackNewEpoch, true);
			return ZxidUtils.makeZxid(newEpoch, 0);
		} else {
			if (newEpoch > self.getAcceptedEpoch()) {
				self.setAcceptedEpoch(newEpoch);
			}
			if (qp.getType() != Leader.NEWLEADER) {
				LOG.error("First packet should have been NEWLEADER");
				throw new IOException("First packet should have been NEWLEADER");
			}
			return qp.getZxid();
		}
	}

	/**
	 * send a request packet to the leader
	 *
	 * @param request the request from the client
	 * @throws IOException
	 */
	void request(Request request) throws IOException {
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		DataOutputStream oa = new DataOutputStream(baos);
		oa.writeLong(request.sessionId);
		oa.writeInt(request.cxid);
		oa.writeInt(request.type);
		if (request.request != null) {
			request.request.rewind();
			int len = request.request.remaining();
			byte b[] = new byte[len];
			request.request.get(b);
			request.request.rewind();
			oa.write(b);
		}
		oa.close();
		QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
		writePacket(qp, true);
	}

	protected void revalidate(QuorumPacket qp) throws IOException {
		ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
		DataInputStream dis = new DataInputStream(bis);
		long sessionId = dis.readLong();
		boolean valid = dis.readBoolean();
		ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
		if (cnxn == null) {
			LOG.warn("Missing session 0x" + Long.toHexString(sessionId) + " for validation");
		} else {
			zk.finishSessionInit(cnxn, valid);
		}
		if (LOG.isTraceEnabled()) {
			ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
					"Session 0x" + Long.toHexString(sessionId) + " is valid: " + valid);
		}
	}

	/**
	 * Shutdown the Peer
	 */
	public void shutdown() {
		// set the zookeeper server to null
		self.cnxnFactory.setZooKeeperServer(null);
		// clear all the connections
		self.cnxnFactory.closeAll();
		// shutdown previous zookeeper
		if (zk != null) {
			zk.shutdown();
		}
	}

	/**
	 * Finally, synchronize our history with the Leader.
	 * 
	 * @param newLeaderZxid
	 * @throws IOException
	 * @throws InterruptedException
	 */
	protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException {
		QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
		QuorumPacket qp = new QuorumPacket();
		long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);

		readPacket(qp);
		LinkedList<Long> packetsCommitted = new LinkedList<Long>();
		LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
		synchronized (zk) {
			if (qp.getType() == Leader.DIFF) {
				LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
			} else if (qp.getType() == Leader.SNAP) {
				LOG.info("Getting a snapshot from leader");
				// The leader is going to dump the database
				// clear our own database and read
				zk.getZKDatabase().clear();
				zk.getZKDatabase().deserializeSnapshot(leaderIs);
				String signature = leaderIs.readString("signature");
				if (!signature.equals("BenWasHere")) {
					LOG.error("Missing signature. Got " + signature);
					throw new IOException("Missing signature");
				}
			} else if (qp.getType() == Leader.TRUNC) {
				// we need to truncate the log to the lastzxid of the leader
				LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid()));
				boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());
				if (!truncated) {
					// not able to truncate the log
					LOG.error("Not able to truncate the log " + Long.toHexString(qp.getZxid()));
					System.exit(13);
				}

			} else {
				LOG.error("Got unexpected packet from leader " + qp.getType() + " exiting ... ");
				System.exit(13);

			}
			zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
			zk.createSessionTracker();

			long lastQueued = 0;

			// in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
			// we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after
			// the NEWLEADER)
			// we need to make sure that we don't take the snapshot twice.
			boolean snapshotTaken = false;
			// we are now going to start getting transactions to apply followed by an
			// UPTODATE
			outerLoop: while (self.isRunning()) {
				readPacket(qp);
				switch (qp.getType()) {
				case Leader.PROPOSAL:
					PacketInFlight pif = new PacketInFlight();
					pif.hdr = new TxnHeader();
					pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
					if (pif.hdr.getZxid() != lastQueued + 1) {
						LOG.warn("Got zxid 0x" + Long.toHexString(pif.hdr.getZxid()) + " expected 0x"
								+ Long.toHexString(lastQueued + 1));
					}
					lastQueued = pif.hdr.getZxid();
					packetsNotCommitted.add(pif);
					break;
				case Leader.COMMIT:
					if (!snapshotTaken) {
						pif = packetsNotCommitted.peekFirst();
						if (pif.hdr.getZxid() != qp.getZxid()) {
							LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
						} else {
							zk.processTxn(pif.hdr, pif.rec);
							packetsNotCommitted.remove();
						}
					} else {
						packetsCommitted.add(qp.getZxid());
					}
					break;
				case Leader.INFORM:
					TxnHeader hdr = new TxnHeader();
					Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
					zk.processTxn(hdr, txn);
					break;
				case Leader.UPTODATE:
					if (!snapshotTaken) { // true for the pre v1.0 case
						zk.takeSnapshot();
						self.setCurrentEpoch(newEpoch);
					}
					self.cnxnFactory.setZooKeeperServer(zk);
					break outerLoop;
				case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
					zk.takeSnapshot();
					self.setCurrentEpoch(newEpoch);
					snapshotTaken = true;
					writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
					break;
				}
			}
		}
		ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
		writePacket(ack, true);
		sock.setSoTimeout(self.tickTime * self.syncLimit);
		zk.startup();
		// We need to log the stuff that came in between the snapshot and the uptodate
		if (zk instanceof FollowerZooKeeperServer) {
			FollowerZooKeeperServer fzk = (FollowerZooKeeperServer) zk;
			for (PacketInFlight p : packetsNotCommitted) {
				fzk.logRequest(p.hdr, p.rec);
			}
			for (Long zxid : packetsCommitted) {
				fzk.commit(zxid);
			}
		}
	}

	/**
	 * validate a session for a client
	 *
	 * @param clientId the client to be revalidated
	 * @param timeout  the timeout for which the session is valid
	 * @return
	 * @throws IOException
	 */
	void validateSession(ServerCnxn cnxn, long clientId, int timeout) throws IOException {
		LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		DataOutputStream dos = new DataOutputStream(baos);
		dos.writeLong(clientId);
		dos.writeInt(timeout);
		dos.close();
		QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos.toByteArray(), null);
		pendingRevalidations.put(clientId, cnxn);
		if (LOG.isTraceEnabled()) {
			ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
					"To validate session 0x" + Long.toHexString(clientId));
		}
		writePacket(qp, true);
	}

	/**
	 * write a packet to the leader
	 *
	 * @param pp the proposal packet to be sent to the leader
	 * @throws IOException
	 */
	void writePacket(QuorumPacket pp, boolean flush) throws IOException {
		synchronized (leaderOs) {
			if (pp != null) {
				leaderOs.writeRecord(pp, "packet");
			}
			if (flush) {
				bufferedOutput.flush();
			}
		}
	}
}
